package com.mti.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;

import java.util.Arrays;
import java.util.Properties;

/**
 *
 * @author zhaichen
 * @since 2020-06-12 15:27
 */
public class Consumer implements Runnable {
    private static KafkaConsumer<String, String> consumer;

    private ConsumerRecords<String, String> msgList;

    private static String topic;

    private static final String GROUPID = "groupA";

    @Value("${kafka.servers}")
    private String servers;

    public Consumer(String topicName) {
        Properties props = new Properties();
        props.put("bootstrap.servers", servers);
        props.put("group.id", GROUPID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        consumer = new KafkaConsumer<>(props);
        topic = topicName;
        consumer.subscribe(Arrays.asList(topic));
    }

    public static void getMessage() {
        consumer.poll(1000);
    }

    @Override
    public void run() {
        int messageNo = 1;
        System.out.println("---------开始消费---------");
        try {
            for (;;) {
                msgList = consumer.poll(1000);
                if(null!=msgList&&msgList.count()>0){
                    for (ConsumerRecord<String, String> record : msgList) {
                        //消费100条就打印 ,但打印的数据不一定是这个规律的
                        if(messageNo%100==0){
                            System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
                        }
                        //当消费了1000条就退出
                        if(messageNo%1000==0){
                            break;
                        }
                        messageNo++;
                    }
                }else{
                    Thread.sleep(1000);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    public static void main(String[] args) {
        Consumer test = new Consumer("test");
        Thread thread = new Thread(test);
        thread.start();
    }
}
